package io.feige.rpc.consumer.network.implement;

import static org.jboss.netty.channel.Channels.pipeline;
import io.feige.rpc.consumer.RpcConsumerService;
import io.feige.rpc.consumer.config.RemoteServiceConfig;
import io.feige.rpc.consumer.network.ConsumerConnectionService;
import io.feige.rpc.consumer.network.future.InvokeFuture;
import io.feige.rpc.consumer.network.listener.InvokeListener;
import io.feige.rpc.consumer.network.vo.RequestVo;
import io.feige.rpc.exception.ProviderUnavailableException;
import io.feige.rpc.protocol.Protocol;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerConnectionServiceImpl implements ConsumerConnectionService {

	private static final Logger logger=LoggerFactory.getLogger(ConsumerConnectionServiceImpl.class);
	
	private ClientBootstrap bootstrap; 
	
	private HashedWheelTimer idleTimer = new HashedWheelTimer(); 
	
	private AtomicLong seq=new AtomicLong(0);
	
	private Protocol protocol;
	//现在每个服务只建立一个连接，满足大多数情况，后继实现连接池
	private Map<String, Channel> channels=new ConcurrentHashMap<String, Channel>();

	public Map<Long, InvokeFuture<Object>> futrues=new ConcurrentHashMap<Long, InvokeFuture<Object>>();
	
	private Map<String, Queue<RequestVo>> requests=new ConcurrentHashMap<String, Queue<RequestVo>>();
	
	private RpcConsumerService rpcConsumerService;
	
	@Override
	public void start(RpcConsumerService rpcConsumerService) {
		this.rpcConsumerService=rpcConsumerService;
		this.protocol=rpcConsumerService.getProtocol();
        bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
        bootstrap.setPipelineFactory(new ClientPipelineFactory()); 
        updateConnections(rpcConsumerService.getRemoteServiceConfigManager().getRemoteServiceConfigMap());
	}

	@Override
	public void stop() {
		bootstrap.releaseExternalResources();
	}
	
	@Override
	public void updateConnections(Map<String, RemoteServiceConfig> remoteServiceConfigMap) {
		
		//后继会实现配置动态刷新
		Map<String, SocketAddress> saMap=new HashMap<String, SocketAddress>();
		Iterator<String> ite=remoteServiceConfigMap.keySet().iterator();
		while(ite.hasNext()){
			String key=ite.next();
			RemoteServiceConfig config=remoteServiceConfigMap.get(key);
			for (SocketAddress socketAddress : config.getHosts()) {
				saMap.put(socketAddress.toString(), socketAddress);
				if (!channels.containsKey(socketAddress.toString())) {
					connect(socketAddress);
				}
			}
		}
		
		Iterator<String> cite = channels.keySet().iterator();
		while(cite.hasNext()){
			String key=cite.next();
			if(!saMap.containsKey(key)){
				removeChannels(key);
			}
		}

	}

	private void removeChannels(String key) {
		Channel ch = channels.remove(key);
		if (ch!=null) {
			if (ch.isConnected()) {
				ch.close();
			}
		}
		
		Queue<RequestVo> reqQueue = requests.get(key);
		if (reqQueue!=null) {
			RequestVo rv;
			while((rv=reqQueue.poll())!=null){
				if(rv.getFuture()!=null){
					rv.getFuture().setCause(new ProviderUnavailableException());
				}
			}
		}
	}

	 
	@Override
	public void removeConnection(Channel channel) {
		if (channel.getRemoteAddress()!=null) {
			removeChannels(channel.getRemoteAddress().toString());
		}
	}
	
	@Override
	public void connect(SocketAddress sa) {
		ChannelFuture future = bootstrap.connect(sa);
		future.addListener(new ConnectListener());
	}
 
	public class ConnectListener implements ChannelFutureListener {
        
	    public void operationComplete(ChannelFuture future){
	        if (!future.isSuccess()) {
	        	logger.warn("connect failed, {}", future.getCause().getMessage());                             
	            return;                                                                 
	        }
	        Channel channel = future.getChannel();
	        channels.put(channel.getRemoteAddress().toString(), channel); 
			Queue<RequestVo> requestsQueue = new ConcurrentLinkedQueue<RequestVo>();
			requests.put(channel.getRemoteAddress().toString(), requestsQueue);
	    }
	}


	@Override
	public boolean checkConnection(SocketAddress sa) {
		Channel ch = channels.get(sa.toString());
		return ch!=null&&ch.isConnected();
	}

	@Override
	public Object write(SocketAddress sa, String service, String method, Class<?>[] types, Object[] args, int timeout) {
		Long seq=getSeq();
		Object obj=protocol.getWriteObject(service, method, types, args, seq);
		InvokeFuture<Object> future=new InvokeFuture<Object>();
		futrues.put(seq, future);
		write(sa, obj, future);
		try {
			Object result=future.getResult(timeout, TimeUnit.SECONDS);
			return result;
		} catch (RuntimeException e) {
			throw e;
		}finally {
			futrues.remove(seq);
		}
	}
	
	@Override
	public void write(SocketAddress sa, String service, String method, Class<?>[] types, Object[] args, InvokeListener<?> listener) {
		
	}
	
	private void write(SocketAddress sa, Object obj, InvokeFuture<Object> future){
		Channel channel=getChannel(sa);
		if (channel!=null&&channel.isWritable()) {
			write(channel, obj, future);
		}else{
			RequestVo reqVo=new RequestVo();
			reqVo.setFuture(future);
			reqVo.setRequest(obj);
			requests.get(sa.toString()).offer(reqVo);
			checkRequestQueue(sa);
		}
	}
	
	private void write(Channel channel, Object obj, final InvokeFuture<Object> future){
		ChannelFuture cfuture = channel.write(obj);
		cfuture.addListener(new ChannelFutureListener() {
			
			@Override
			public void operationComplete(ChannelFuture rfuture) throws Exception {
				if(!rfuture.isSuccess()){
					future.setCause(rfuture.getCause());
				}
			}
		});

	}
	
	private void checkRequestQueue(SocketAddress sa) {
		Channel channel=getChannel(sa);
		if (channel!=null) {
			checkRequestQueue(channel);
		}
	}

	public void checkRequestQueue(Channel channel) {
		logger.debug("checkRequestQueue {}", channel);
		Queue<RequestVo> reqs = requests.get(channel.getRemoteAddress().toString());
		RequestVo rv;
		while ((rv=reqs.poll())!=null) {
			logger.debug("find request from the queue {}", channel);
			if (channel.isWritable()) {
				write(channel, rv.getRequest(), rv.getFuture());
			}else{
				reqs.offer(rv);
				break;
			}
		}
		 
	}

	private Channel getChannel(SocketAddress sa){
		return channels.get(sa.toString());
	}
	
	private Long getSeq(){
		return seq.getAndIncrement();
	}

	@Override
	public void received(Object result, Throwable e, Long seq) {
		if (seq!=null) {
			InvokeFuture<Object> future = futrues.remove(seq);
			if (future==null) {
				return;
			}
			if (e!=null) {
				future.setCause(e);
			}else{
				future.setResult(result);
			}
		}
	}
 
	public class ClientPipelineFactory implements ChannelPipelineFactory {
		 

		public ChannelPipeline getPipeline() throws Exception {
	        ChannelPipeline pipeline = pipeline();
	        pipeline.addLast("decoder", (SimpleChannelUpstreamHandler)protocol.getDecoder());
	        pipeline.addLast("encoder", (ChannelDownstreamHandler)protocol.getEncoder());
	        pipeline.addLast("timeout", new IdleStateHandler(idleTimer, 10, 0, 0));
			pipeline.addLast("idleHandler", (IdleStateAwareChannelHandler)protocol.getStateCheckChannelHandler(rpcConsumerService)); 
	        pipeline.addLast("handler", (SimpleChannelHandler)protocol.getInvokerResponseHandler(rpcConsumerService));
	        return pipeline;
	    }
	}
}
